package org.developer.hotitems_analysis

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducer {

  def main(args: Array[String]): Unit = {

  }

  def writeToKafka(topic: String): Unit={

    val properties = new Properties()

    properties.put("bootStrap.servers","localhost:9092")
    properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    //定义一个 Kafka producer
    val producer = new KafkaProducer[String,String](properties)
    //从文件中读取数据，发送
    val bufferedSource = io.Source.fromFile("D:\\idea\\projects\\gitee\\userBehaviorAnalysis\\HotitemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    for( line <- bufferedSource.getLines() ){

      val record = new ProducerRecord[String,String](topic,line)
      producer.send(record)
    }

    producer.close()

  }
}
